-
Notifications
You must be signed in to change notification settings - Fork 117
Dynamic Executor Scaling #224
Conversation
e33dbb9 to
d6b3269
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Realize this is a WIP so left just cursory comments for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was not having this ready check a bug before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding of the API is that this is supposed to have "fire and forget" semantics, where the executors are requested but there's no guarantee that they are up after they have been requested. The return value of the future is only if the request is acknowledged, not if the executors resolved in starting up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right. I'm trying to understand if we should then launch each pod and wait for it to go running before continuing in this method. Would that make sense? I think the method is called in a way that there aren't several parallel invocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so - we're returning a future here so even though we don't execute this method multiple times in parallel we might be executing multiple instances of the future at once. Thus we should probably return immediately after creating the pods without waiting for them to become ready. We can have a separate monitor / watch that handles when the executor pods fail to launch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
colocatedPods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executor -> shuffle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
sbin/kubernetes-shuffle-service.yaml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the sbin/ folder usually has just binaries in it, maybe put in conf/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the path on the node, on the shuffle pod, or on the executor pod?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the path on the executor pod. For now, it's up to the user to ensure that it's consistent with that on the node and the shuffle pod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the rest of Spark uses its local dir instead of the tmpdir for shuffle sans shuffle service?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic is very similar to farther below -- maybe extract out a (Pod => Boolean) predicate function called isPodReady that can be used in both places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Readiness.isReady() from the Kubernetes client library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a case class for clarity instead of returning a tuple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Option.map instead of match here. Using map and getOrElse we can use a more fluent builder pattern. Perhaps something like this:
val basePodBuilder = new PodBuilder().<default configuration>.endSpec()
val resolvedPodBuilder = shuffleDir.map { dir =>
basePodBuilder.editSpec()
.setVolumes(...)
.editContainerAt(0).<edits>.endContainer() // Can also use editMatchingContainer()
.endSpec()
}.getOrElse(basePodBuilder)
kubernetesClient.pods().create(resolvedPod.build())
We could also build the container separately from the pod, but I'm not sure how much that buys us. That would look something like this:
val baseContainerBuilder = new ContainerBuilder().<default configurations>
val basePodBuilder = new PodBuilder().<default configuration>.endSpec()
val resolvedPodBuilder = shuffleDir.map { dir =>
basePodBuilder.editSpec()
.setVolumes(...)
.endSpec()
}.getOrElse(basePodBuilder)
val resolvedContainerBuilder = shuffleDir.map { dir =>
baseContainerBuilder.addToVolumeMounts(...)
}.getOrElse(baseContainerBuilder)
val resolvedPod = resolvedPodBuilder().editSpec()
.addContainer(resolvedContainerBuilder.build())
.endSpec()
.build()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this command be running the shuffle service? Use bin/spark-class to help here if that's the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format seems off
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the reason of modifying the RetrieveSparkAppConfig is just to get the executorId? You should already get the executorId with RegisterExecutor message.
My 2cents it will be better to have as little modifications possible in Spark core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that the driver needs to send slightly different config to each executor - each executor gets a different shuffle service pod IP. The driver needs to differentiate between the each executor when it's fetching config. We can do this at the time of the RegisterExecutor message but would need to extend and override parts of CoarseGrainedExecutorBackend.scala. I suspect the changes will be more complex if we do that.
|
I took a look at the local storage proposal and I think it helps us with allocating local disk space for shuffle and managing that capping its amount, but I didn't see any mention of the multiple-mounts scenario that the proposal doc mentioned. During a shuffle the executor pods will have a volume mounted that it's writing to, and simultaneously the shuffle service will be serving up files from the volume to consumers on other nodes. Most likely the two volume mounts will overlap, with the executor pod's volume in a subfolder of the shuffle service pod's volume in the node. |
4a007e2 to
6742ab2
Compare
conf/kubernetes-shuffle-service.yaml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a official image that we'll be supporting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @tnachen's question. Maybe add a comment pointing to the Dockerfile below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a new configuration you are introducing? We will need documentation for this for sure to know what is this for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please use the ConfigBuilder from config.scala and its .internal() marker if this is not meant to be set by users and is only used internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ash211 do you think we should add it into spark-core alongside spark.shuffle.service.port that already exists there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the configbuilder for references to spark.shuffle.service.host within the Kubernetes package; and left it as it is here, conforming to the surrounding code in spark-core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the formatting, I don't think this typical Spark style (arg on each line and 4 spaces indented)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Debug only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like setting it lets you override the IP that the executor thinks its colocated shuffle service is on -- should put that in the doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should move this back to match Spark style
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto Style
|
rerun integration test please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only took a cursory look at the main code. But it seems pretty good as is. Thanks.
conf/kubernetes-shuffle-service.yaml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/tmp might not be ideal in many setups. e.g. It may be mounting small-size memfs not large enough for large shuffle data. It might be better to use a unique dir here like spark-shuffle-dir and document that people should prepare the dir if necessary.
Eventually we should perhaps make this configurable by turning this into a helm chart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it would be a helm chart since it doesn't deploy an entire application; just a dependent component (and the whole overhead of helm/tiller would be prohibitive). This is only a sample YAML that a user might use. I was thinking of distributing a YAML file that the user may modify according to their needs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to understand your points.
it doesn't deploy an entire application
Helm can be used for deploying a service. I consider the spark shuffle service a service independent of individual spark jobs.
Or, by "an entire application", do you mean the fact that the docker image may not be what the user wants?
the whole overhead of helm/tiller would be prohibitive
By "overhead", do you mean the "setup steps" such as downloading helm and doing $ helm init? Or do you mean the "runtime" cpu, etc overhead?
If you mean "setup steps", it may not be a lot compared with the alternative of modifying this yaml file on her own and doing $ kubectl create. At least that was my experience. Another thing to consider is mucking with yaml can easily lead to syntax errors, and troubleshooting hours. Helm chart is a black box shielding inexperienced users from that.
Anyway, I wouldn't block this PR for not having helm. I just think it is nice way to go for the future iterations of this.
Independent of helm or not, I still think putting /tmp here is a not a good example.
One more related point is that we may have multiple dirs in this as an example. Shuffle is seek intensive. Using multiple disks is quite common. We may want to show how it can be done as an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "overhead", do you mean the "setup steps" such as downloading helm and doing $ helm init? Or do you mean the "runtime" cpu, etc overhead?
Yes. Helm is one package manager, but not necessarily installed on most clusters. It would be excessive to expect installations of helm to run a sub-component of Spark in my opinion.
Independent of helm or not, I still think putting /tmp here is a not a good example.
Fair enough, I'll include a better example, of multiple directories then. What do you think would be a good default shuffle directory on hosts? /shuffle-files or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. You're trying to lower the adoption barrier.
I would think /spark-shuffle-files-0, /spark-shffule-files-1, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating with the result of the SIG meeting discussion. We agreed /var/tmp would be good default base dir for this. So maybe /var/tmp/spark-shuffle-files-0, /var/tmp/spark-shuffle-files-1, etc.
conf/kubernetes-shuffle-service.yaml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to @tnachen's question. Maybe add a comment pointing to the Dockerfile below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Undo the formatting change here - but IDEs don't handle this well unfortunately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the dot at the end of line 312 to this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the Client class directly as using spark-submit manipulates system properties
|
I wonder what more we can do for testing here. It would be nice for example to verify that reads and writes are happening on the pods at the disk level and that the shuffle service pod can see what is being written by the executor pod. Have we also confirmed that the test job reads and writes shuffle files? More testing at the disk level will allow us to verify changes we make whenconsidering e.g. isolation. We also should consider testing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also please use the ConfigBuilder from config.scala and its .internal() marker if this is not meant to be set by users and is only used internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect this to be set by the user, so remove .internal()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove .internal()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove .internal()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when launching say 100 executors at the start of a job and each executor is registering with the driver, this will trigger 100 near-simultaneous requests to the apiserver for running shuffle service pods.
Does it make sense instead to keep a list locally that's updated by a Watch, and to refer to that cached local list instead of querying apiserver each time?
Not sure the impact of this level of query load on apiserver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea. I'll add this caching in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like setting it lets you override the IP that the executor thinks its colocated shuffle service is on -- should put that in the doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something like colocatedPods.map(_.getStatus.getPodIP).mkString(",")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feels like not quite the right match expression here -- how about just _ ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah how to match against a 1-item list here? Not sure if it's a Seq or something else at this point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the nice? should we do this in the other Dockerfiles too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought 0 is the default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With #227 merged, each component no longer needs to have its own assembly XML file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: src/main/docker/shuffle -> src/main/docker/shuffle-service for a tad more clarity.
b0751cb to
061bc92
Compare
| } | ||
| totalExpectedExecutors.set(requestedTotal) | ||
|
|
||
| allPodsReady |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC here doRequestTotalExecutors should return true unless it fails to send the scaling request to the cluster manager. The return value has nothing to do with whether the executors are ready or not.
Both yarn and mesos's backend implementation do something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we should not call k8s api directly inside doRequestTotalExecutors, because the returned Future is waited by the base class's requestExecutors method with a timeout. The current code could block the dynamic allocation manager thread for a long time, e.g. when there is a intermittent network problem, or cause it to exit if there is any exception thrown when calling k8s api. We should use a worker thread to do it, wrap any possible k8s exception, and return a real Future<Boolean> here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
K8s API calls are asynchronous in and of themselves, as long as we're not specifically watching the resources for their readiness. I think it's actually semantically correct to try to request the executors here from the API server. If there's a network timeout or a failure then the state of the Future should reflect as such. YarnSchedulerBackend does this as well, in that it queries the application master with a message indicating the target number of executors that is desired. In client mode this could be a remote call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one difference here I think. Kubernetes will not refuse to accept new pods even if there are no resources, and they'll just go pending. We need some mechanism in between to ensure we do not request more executors than can be accommodated at the current time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also the case in YARN. The request to the application master only sets the desired number of executors: (see this method). The application master will then continuously attempt to fetch these resources after the initial request in its allocator thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thus even if the cluster does not actually have the sufficient resources available, true is still returned by doRequestTotalExecutors in YARN mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I didn't realized the body of doRequestTotalExecutors is actually running in a thread pool provided by the implicit val requestExecutorContext executor.
But the other question still holds: we should return true unless it fails to to contact k8s api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last commit is the interesting one and the 10 seconds between attempted allocations is just an example. I expect that we'll let the user configure (view some new spark option) how often to try and allocate new executor pods.
8db4287 to
f342055
Compare
| interval: Int, | ||
| backend: KubernetesClusterSchedulerBackend) extends Logging { | ||
|
|
||
| private val scheduler = Executors.newScheduledThreadPool(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering, if this should be a daemon thread. If yes, then use ThreadUtils from Spark packages to create a "named" daemon thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 use a daemon thread and use ThreadUtils.
| backend: KubernetesClusterSchedulerBackend) extends Logging { | ||
|
|
||
| private val scheduler = Executors.newScheduledThreadPool(1) | ||
| private val allocatorRunnable: Runnable = new Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using Java Runnable and specifically submitting the task, consider using Scala's ExecutionContext and Future blocks.
| // This is a set of all ready executor pods. | ||
| private var readyExecutorPods = Set[String]() | ||
|
|
||
| client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start the watch in start() and keep a reference to it. Close the watch in stop().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed this isn't part of the scheduler backend so such methods like start() and stop() aren't defined. But we should be starting and stopping the watch in KubernetesClusterSchedulerBackend#start() and KubernetesClusterSchedulerBackend#stop() analogously.
| * KubernetesAllocator class watches all the executor pods associated with | ||
| * this SparkJob and creates new executors when it is appropriate. | ||
| */ | ||
| private[spark] class KubernetesAllocator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be pulled out to a separate class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, meant separate file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually maybe it doesn't belong in a separate class at all - the reference to a KubernetesClusterSchedulerBackend in this class makes me think this logic actually belongs in the scheduler backend itself.
|
rerun integration test please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a separate executor limit here, right? Can we not use the expected number of executors, and then block on the executors we've specifically requested in this round to be ready? I think it's less clear if we have to track more global state in having both an executor limit and an expected executor count.
af50dc0 to
39572cf
Compare
39572cf to
93a1815
Compare
This is the initial PR (some integration test fixes & scalastyle fixes pending).
I hope to keep the scope of this minimal and more or less what it is now, with future PRs adding:
cc @apache-spark-on-k8s/contributors